<?php

namespace app\common\command;

use app\common\service\ProcessService;
use app\common\service\QueueService;
use think\Console;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use think\Db;
use think\Log;

/**
 * 异步任务管理指令
 * Class Queue
 * @package think\admin\command
 */
class Queue extends Command
{

    /**
     * 任务编号
     * @var string
     */
    protected $code;

    /**
     * 绑定数据表
     * @var string
     */
    protected $table = 'SystemQueue';


    protected $process;
    protected $queue;

    private $processTitle;

    public function __construct($name = null)
    {
        $this->process = new ProcessService();
        $this->queue = new QueueService();

        parent::__construct($name);
    }

    /**
     * 配置指令参数
     */
    public function configure()
    {
        $this->setName('api:queue');
        $this->addArgument('action', Argument::OPTIONAL, 'stop|start|status|query|listen|clean|dorun|webstop|webstart|webstatus', 'listen');
        $this->addArgument('code', Argument::OPTIONAL, 'Taskcode');
        $this->addArgument('spts', Argument::OPTIONAL, 'Separator');
        $this->addOption('host', '-H', Option::VALUE_OPTIONAL, 'The host of WebServer.');
        $this->addOption('port', '-p', Option::VALUE_OPTIONAL, 'The port of WebServer.');
        $this->addOption('daemon', 'd', Option::VALUE_NONE, 'Run the queue listen in daemon mode');
        $this->setDescription('项目任务队列');
    }

    /**
     * 执行指令内容
     * @param Input $input
     * @param Output $output
     * @return mixed
     */
    public function execute(Input $input, Output $output)
    {
        $action = $this->input->hasOption('daemon') ? 'start' : $input->getArgument('action');
        if (method_exists($this, $method = "{$action}Action")) return $this->$method();
        $this->output->error("Wrong operation, Allow stop|start|status|query|listen|clean|dorun|webstop|webstart|webstatus");
    }

    /**
     * 停止 WebServer 调试进程
     */
    protected function webStopAction()
    {
        $root = "{$this->app->getRootPath()}public" . DIRECTORY_SEPARATOR;
        if (count($result = $this->process->query("-t {$root} {$root}router.php")) < 1) {
            $this->output->writeln("There are no WebServer processes to stop");
        } else foreach ($result as $item) {
            $this->process->close($item['pid']);
            $this->output->writeln("Successfully sent end signal to process {$item['pid']}");
        }
    }

    /**
     * 启动 WebServer 调试进程
     */
    protected function webStartAction()
    {
        $port = $this->input->getOption('port') ?: '80';
        $host = $this->input->getOption('host') ?: '127.0.0.1';
        $root = "{$this->app->getRootPath()}public" . DIRECTORY_SEPARATOR;
        $command = "php -S {$host}:{$port} -t {$root} {$root}router.php";
        if (count($result = $this->process->query($command)) > 0) {
            if ($this->process->iswin()) {
                $this->process->exec("start http://{$host}:{$port}");
            }
            $this->output->writeln("WebServer process already exist for pid {$result['0']['pid']}");
        } else {
            [$this->process->create($command), usleep(2000)];
            if (count($result = $this->process->query($command)) > 0) {
                $this->output->writeln("WebServer process started successfully for pid {$result['0']['pid']}");
                if ($this->process->iswin()) {
                    $this->process->exec("start http://{$host}:{$port}");
                }
            } else {
                $this->output->writeln('WebServer process failed to start');
            }
        }
    }

    /**
     * 查看 WebServer 调试进程
     */
    protected function webStatusAction()
    {
        $root = "{$this->app->getRootPath()}public" . DIRECTORY_SEPARATOR;
        if (count($result = $this->process->query("-t {$root} {$root}router.php")) > 0) {
            $this->output->info("WebServer process {$result[0]['pid']} running");
            $this->output->write("># {$result[0]['cmd']}");
        } else {
            $this->output->warning("The WebServer process is not running");
        }
    }

    /**
     * 停止所有任务
     */
    protected function stopAction()
    {
        $keyword = $this->process->think('api:queue');
        if (count($result = $this->process->query($keyword)) < 1) {
            $this->output->writeln("There are no task processes to stop");
        } else foreach ($result as $item) {
            $this->process->close($item['pid']);
            $this->output->writeln("Successfully sent end signal to process {$item['pid']}");
        }
    }

    /**
     * 启动后台任务
     */
    protected function startAction()
    {
        Db::name($this->table)->count();
        $command = $this->process->think("api:queue listen");

        if (count($result = $this->process->query($command)) > 0) {
            $this->output->writeln("Asynchronous daemons already exist for pid {$result[0]['pid']}");
        } else {
            [$this->process->create($command), usleep(1000)];
            if (count($result = $this->process->query($command)) > 0) {
                $this->output->writeln("Asynchronous daemons started successfully for pid {$result[0]['pid']}");
            } else {
                $this->output->writeln("Asynchronous daemons failed to start");
            }
        }
    }

    /**
     * 查询所有任务
     */
    protected function queryAction()
    {
        $result = $this->process->query($this->process->think("api:queue"));
        if (count($result) > 0) foreach ($result as $item) {
            $this->output->writeln("{$item['pid']}\t{$item['cmd']}");
        } else {
            $this->output->writeln('No related task process found');
        }
    }

    /**
     * 清理所有任务
     * @throws \think\admin\Exception
     * @throws \think\db\exception\DataNotFoundException
     * @throws \think\db\exception\DbException
     * @throws \think\db\exception\ModelNotFoundException
     */
    protected function cleanAction()
    {
        // 清理 7 天前的历史任务记录
        $map['exec_time'] = ['<', time() - 7 * 24 * 3600];
        $clear = Db::name($this->table)->where($map)->delete();
        $this->setQueueProgress("本次清理了 {$clear} 条历史任务记录");
        // 标记超过 1 小时未完成的任务为失败状态，循环任务失败重置
        $map1['loops_time'] = ['>', 0];
        $map1['status'] = 4; // 执行失败的循环任务
        $map2['exec_time'] = ['<', time() - 3600];
        $map2['status'] = 2; // 执行超时的任务
        $timeout = 0;
        $loops = 0;
        $total = Db::name($this->table)->where($map1)->whereOr($map2)->count();
        Db::name($this->table)->where($map1)->whereOr($map2)->chunk(100, function ($result) use ($total, &$loops, &$timeout) {
            foreach ($result as $item) {
                $item['loops_time'] > 0 ? $loops++ : $timeout++;
                $prefix = str_pad($timeout + $loops, strlen("{$total}"), '0', STR_PAD_LEFT);
                if ($item['loops_time'] > 0) {
                    $this->setQueueProgress("[{$prefix}/{$total}] 正在重置任务 {$item['code']} 为运行", ($timeout + $loops) * 100 / $total);
                    $status = 1;
                    $message = intval($item['status']) === 4 ? '任务执行失败，已自动重置任务！' : '任务执行超时，已自动重置任务！';
                } else {
                    $this->setQueueProgress("[{$prefix}/{$total}] 正在标记任务 {$item['code']} 为超时", ($timeout + $loops) * 100 / $total);
                    $status = 4;
                    $message = '任务执行超时，已自动标识为失败！';
                }
                Db::name($this->table)->where(['id' => $item['id']])->update(['status' => $status, 'exec_desc' => $message]);
            }
        });
        $this->setQueueSuccess("清理 {$clear} 条历史任务，关闭 {$timeout} 条超时任务，重置 {$loops} 条循环任务");
    }

    /**
     * 查询兼听状态
     */
    protected function statusAction()
    {
        $command = $this->process->think('api:queue listen');
        if (count($result = $this->process->query($command)) > 0) {
            $this->output->info("Listening for main process {$result[0]['pid']} running");
        } else {
            $this->output->warning("The Listening main process is not running");
        }
    }

    /**
     * 立即监听任务
     */
    protected function listenAction()
    {
        set_time_limit(0);
//        Db::setLog(new NullLogger());
        Db::name($this->table)->count();
        if ($this->process->iswin()) {
            $this->setProcessTitle("ThinkAdmin Queue Listen");
        }
        $this->output->writeln("\tYou can exit with <info>`CTRL-C`</info>");
        $this->output->writeln('============== LISTENING ==============');
        while (true) {
            Db::name($this->table)->where('status', 1)->where('exec_time', '<=', time())->chunk(100, function ($result) {
                foreach ($result as $vo) try {
                    $command = $this->process->think("api:queue dorun {$vo['code']} -");
                    if (count($this->process->query($command)) > 0) {
                        $this->output->writeln("Already in progress -> [{$vo['code']}] {$vo['title']}");
                    } else {
                        $this->process->create($command);
                        $this->output->writeln("Created new process -> [{$vo['code']}] {$vo['title']}");
                    }
                } catch (\Exception $exception) {
                    Db::name($this->table)->where(['code' => $vo['code']])->update([
                        'status' => '4', 'outer_time' => time(), 'exec_desc' => $exception->getMessage(),
                    ]);
                    $this->output->error("Execution failed -> [{$vo['code']}] {$vo['title']}，{$exception->getMessage()}");
                }
            });
            usleep(500000);
        }
    }


    /**
     * 执行任务内容
     * @throws \think\db\exception\DbException
     */
    protected function doRunAction()
    {
        set_time_limit(0);
        $this->code = trim($this->input->getArgument('code'));

        if (empty($this->code)) {
            $this->output->error('Task number needs to be specified for task execution');
        } else try {
            $this->queue->initialize($this->code);
            if (empty($this->queue->record) || intval($this->queue->record['status']) !== 1) {
                // 这里不做任何处理（该任务可能在其它地方已经在执行）
                $this->output->warning($message = "The or status of task {$this->code} is abnormal");
            } else {
                // 锁定任务状态，防止任务再次被执行
                Db::name($this->table)->strict(false)->where(['code' => $this->code])->update([
                    'enter_time' => microtime(true), 'attempts' => Db::raw('attempts+1'),
                    'outer_time' => '0', 'exec_pid' => getmypid(), 'exec_desc' => '', 'status' => '2',
                ]);

                $this->queue->progress(2, '>>> 任务处理开始 <<<', 0);
                // 执行任务内容
                defined('WorkQueueCall') or define('WorkQueueCall', true);
                defined('WorkQueueCode') or define('WorkQueueCode', $this->code);

                // 自定义指令，不支持返回消息（支持异常结束，异常码可选择 3|4 设置任务状态）
                $attr = explode(' ', trim(preg_replace('|\s+|', ' ', $this->queue->record['command'])));
                $this->updateQueue(3, Console::call(array_shift($attr), $attr)->fetch(), false);

            }
        } catch (\Exception $exception) {
            $code = $exception->getCode();
            if (intval($code) !== 3) $code = 4;
            $this->updateQueue($code, $exception->getMessage());
        } catch (\Error $exception) {
            $code = $exception->getCode();
            if (intval($code) !== 3) $code = 4;
            $this->updateQueue($code, $exception->getMessage());
        } catch (\Throwable $exception) {
            $code = $exception->getCode();
            if (intval($code) !== 3) $code = 4;
            $this->updateQueue($code, $exception->getMessage());
        }
    }

    /**
     * 修改当前任务状态
     * @param integer $status 任务状态
     * @param string $message 消息内容
     * @param boolean $issplit 是否分隔
     */
    protected function updateQueue($status, $message, $issplit = true)
    {
        // 更新当前任务
        $info = trim(is_string($message) ? $message : '');
        $desc = $issplit ? explode("\n", $info) : [$message];
        Db::name($this->table)->strict(false)->where(['code' => $this->code])->update([
            'status' => $status, 'outer_time' => microtime(true), 'exec_pid' => getmypid(), 'exec_desc' => $desc[0],
        ]);
        $this->output->writeln(is_string($message) ? $message : '');
        // 任务进度标记
        if (!empty($desc[0])) {
            $this->queue->progress($status, ">>> {$desc[0]} <<<");
        }
        if ($status == 3) {
            $this->queue->progress($status, '>>> 任务处理完成 <<<', 100);
        } elseif ($status == 4) {
            $this->queue->progress($status, '>>> 任务处理失败 <<<');
        }
        // 注册循环任务
        if (isset($this->queue->record['loops_time']) && $this->queue->record['loops_time'] > 0) {
            try {
                $this->queue->initialize($this->code)->reset($this->queue->record['loops_time']);
            } catch (\Exception $exception) {
                Log::error("Queue {$this->queue->record['code']} Loops Failed. {$exception->getMessage()}");
            } catch (\Error $exception) {
                Log::error("Queue {$this->queue->record['code']} Loops Failed. {$exception->getMessage()}");
            }
        }
    }

    /**
     * 设置进程名称
     *
     * PHP 5.5+ or the proctitle PECL library is required
     *
     * @param string $title The process title
     *
     * @return $this
     */
    public function setProcessTitle($title)
    {
        $this->processTitle = $title;

        return $this;
    }
}